;; Original work by Amirouche.
;; Some of the comments or docstrings by Zelphir.

;; pool of workers that can be used to execute blocking operation in a
;; fibers application.
;;
;; TODO: maybe it will be better to re-base this module on `future` or
;; `promise` object.
(define-module (babelia pool))

(import (ice-9 match))
(import (ice-9 q))
(import (ice-9 threads))
(import (srfi srfi-9))
(import (srfi srfi-1))
(import (fibers))
(import (fibers channels))
(import (fibers operations))
(import (babelia thread))
(import (babelia okvs ulid))
(import (babelia log))


(define %channel #f)

(define worker-count (- (current-processor-count) 1))

(define (worker channel)
  (parameterize ((thread-index (random-bytes 2)))
    (let ((worker (make-channel)))
      (let loop ()
        (put-message channel (cons 'worker worker))
        (let* ((work (get-message worker))
               (thunk (car work))
               (return (cdr work))
               ;; Execute thunk and send the returned value.  XXX: To be able
               ;; to keep track of jobs, the channel called `return`, is put
               ;; in itself.  See procedure pool-for-each-par-map.

               ;; TODO: add a call-with-values
               (out (thunk)))
          (put-message return (cons return out)))
        (loop)))))

(define (arbiter channel)
  "The arbiter is actually a loop, which looks on a given channel for
messages. If the message is a worker, that means, that the worker is
ready to receive more work to perform. If the message is work, it
means, that this is work to be distributed to the workers. If a worker
is ready and there is work, work is given to the worker. Workers and
work are managed in queues. The queues are modified with mutating
operations. Workers report as ready by putting a worker on the
distribution channel."
  (let ((works (make-q))
        (workers (make-q)))
    ;; Look for messages on the given channel.
    (let loop ((message (get-message channel)))
      ;; Match messages.
      (match message
        ;; If the message is a worker, try to find work for it and
        ;; give that work to the worker, to get it done.
        (('worker . worker)
         (if (q-empty? works)
             (enq! workers worker)
             ;; If work can be given to the worker, take the work out
             ;; of the queue (it shall only be done by one worker) and
             ;; give it to the worker.
             (let ((work (deq! works)))
               (put-message worker work))))
        ;; If the message is work, try to find a worker, which is
        ;; ready and give that worker the work to get it done.
        (('work . work)
         (if (q-empty? workers)
             (enq! works work)
             ;; If a worker is ready to receive work, take it out of
             ;; the queue (it will be not ready any longer) and give
             ;; it work.
             (let ((worker (deq! workers)))
               (put-message worker work))))
        ;; Unrecognized message? Raise an exception.
        (_ (raise (cons 'fuu message))))
      ;; Look for more messages.
      (loop (get-message channel)))))

(define-public (pool-init)
  (if %channel
      (error 'babelia "pool can not be initialized more than once")
      (let ((channel (make-channel)))
        (log-debug "pool init")
        (set! %channel channel)
        (let loop ((index worker-count))
          (unless (zero? index)
            (call-with-new-thread (lambda () (worker channel)))
            (loop (- index 1))))
        (arbiter channel))))

(define (publish thunk)
  "Create an answer channel, on which results will be returned. Put a
message on the global work distribution channel: tag: 'work, thunk:
job to perform, channel: channel on which to reply to the message."
  (let ((return (make-channel)))
    (put-message %channel (cons 'work (cons thunk return)))
    return))

(define-public (pool-apply thunk)
  "Execute THUNK in a worker thread.

   Pause the calling fiber until the result is available."
  (cdr (get-message (publish thunk))))

(define (select channels)
  (perform-operation
   (apply choice-operation (map get-operation channels))))

;; TODO: Maybe add a timeout argument, in order to be able to display
;; a nicer error.
(define-public (pool-for-each-par-map sproc pproc lst)
  "For each item of LST execute (PPROC item) in a worker thread, and
   gather returned value with SPROC. SPROC is executed in the calling
   fiber.

   This a POSIX thread pool based n-for-each-par-map for fibers. It is
   somewhat equivalent to:

     (for-each SSPROC (map PPROC LST))

   But applications of PPROC happens in parallel and waiting for new
   values is not blocking the main thread."
  ;; Look for responses in a loop to get all responses.
  (let loop
      ;; Get the channels, on which responses to messages will arrive.
      ((channels
        ;; Items are probably work to be done, wrapped as thunks.
        (map (lambda (item)
               (publish (lambda ()
                          ;; pproc is the procedure, which performs
                          ;; the work.
                          (pproc item))))
             ;; lst contains the work to be done.
             lst)))
    ;; Only do anything, if there are any channels to retrieve
    ;; responses from.
    (unless (null? channels)
      ;; Select one channel and get the response (result of work) from
      ;; it.
      (match (select channels)
        ((channel . value)
         (sproc value)
         (loop (remove (lambda (x) (eq? x channel)) channels)))
        (else (raise 'fuuubar))))))
